package com.spx.sql;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

/**
 * create by undeRdoG on  2022-05-10  15:27
 * 凡心所向，素履以往，生如逆旅，一苇以航。
 */
public class SQLTest {

    public static void main(String[] args) {
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        TableEnvironment tabEnv = TableEnvironment.create(environmentSettings);

        // 创建读入表
        String createDDL = "CREATE TABLE clickTable (" +
                "user_name STRING, " +
                "url STRING, " +
                "ts BIGINT" +
                ") WITH (" +
                " 'connector' = 'filesystem' , " +
                " 'path' = 'input/clicks.txt' , " +
                " 'format' = 'csv'"
                + ")";

        // 创建一张用于聚合打印的输出表
        String createPrintAggDDL = "CREATE TABLE printAggTable (" +
                "user_name STRING, " +
                "cnt BIGINT " +
                ") WITH (" +
                " 'connector' = 'print'  "
                + ")";


        tabEnv.executeSql(createDDL);
        tabEnv.executeSql(createPrintAggDDL);

        Table aggResult = tabEnv.sqlQuery("select user_name,count(*) as cnt from clickTable group by user_name");

        aggResult.executeInsert("printAggTable");

    }
}
